while (rclcpp::ok(this->context_) && spinning.load()) { /*struct AnyExecutable { RCLCPP_PUBLIC AnyExecutable(); RCLCPP_PUBLIC virtual ~AnyExecutable(); // Only one of the following pointers will be set. rclcpp::SubscriptionBase::SharedPtr subscription; // 话题订阅 rclcpp::TimerBase::SharedPtr timer; // 定时器 rclcpp::ServiceBase::SharedPtr service; // 服务端 rclcpp::ClientBase::SharedPtr client; // 客户端 rclcpp::Waitable::SharedPtr waitable; // These are used to keep the scope on the containing items rclcpp::CallbackGroup::SharedPtr callback_group; rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_base; std::shared_ptr<void> data; }; */ rclcpp::AnyExecutable any_executable;
classDualThreadedNode : public rclcpp::Node { public: DualThreadedNode() : Node("DualThreadedNode") { /* These define the callback groups * They don't really do much on their own, but they have to exist in order to * assign callbacks to them. They're also what the executor looks for when trying to run multiple threads */ callback_group_subscriber1_ = this->create_callback_group( rclcpp::CallbackGroupType::MutuallyExclusive); callback_group_subscriber2_ = this->create_callback_group( rclcpp::CallbackGroupType::MutuallyExclusive);
// Each of these callback groups is basically a thread // Everything assigned to one of them gets bundled into the same thread auto sub1_opt = rclcpp::SubscriptionOptions(); sub1_opt.callback_group = callback_group_subscriber1_; auto sub2_opt = rclcpp::SubscriptionOptions(); sub2_opt.callback_group = callback_group_subscriber2_;
subscription1_ = this->create_subscription<std_msgs::msg::String>( "topic", rclcpp::QoS(10), // std::bind is sort of C++'s way of passing a function // If you're used to function-passing, skip these comments std::bind( &DualThreadedNode::subscriber1_cb, // First parameter is a reference to the function this, // What the function should be bound to std::placeholders::_1), // At this point we're not positive of all the // parameters being passed // So we just put a generic placeholder // into the binder // (since we know we need ONE parameter) sub1_opt); // This is where we set the callback group. // This subscription will run with callback group subscriber1
/** * Every time the Publisher publishes something, all subscribers to the topic get poked * This function gets called when Subscriber1 is poked (due to the std::bind we used when defining it) */ voidsubscriber1_cb(const std_msgs::msg::String::ConstSharedPtr msg) { auto message_received_at = timing_string();
// Extract current thread RCLCPP_INFO( this->get_logger(), "THREAD %s => Heard '%s' at %s", string_thread_id().c_str(), msg->data.c_str(), message_received_at.c_str()); }
/** * This function gets called when Subscriber2 is poked * Since it's running on a separate thread than Subscriber 1, it will run at (more-or-less) the same time! */ voidsubscriber2_cb(const std_msgs::msg::String::ConstSharedPtr msg) { auto message_received_at = timing_string();
// You MUST use the MultiThreadedExecutor to use, well, multiple threads rclcpp::executors::MultiThreadedExecutor executor; auto pubnode = std::make_shared<PublisherNode>(); auto subnode = std::make_shared<DualThreadedNode>();
在官网的说明,对于Reentrant Callback Group中有一句话:This means that, in addition to different callbacks being run parallel to each other, different instances of the same callback may also be executed concurrently.
if (has_invalid_weak_groups_or_nodes) { // 无效的group和node处理,不关心 //..... }
// 清空wait_set,注意这里调用的是rcl(ROS Client Support Library)库,具体底层暂时不关心 rcl_ret_t ret = rcl_wait_set_clear(&wait_set_); if (ret != RCL_RET_OK) { throw_from_rcl_error(ret, "Couldn't clear wait set"); }
// 根据收集的订阅、定时器等,重新申请wait_set内部数据结构内存 // 奇怪的是:这里传递的size参数都没用,直接从底层确定订阅、定时器等数量 ret = rcl_wait_set_resize( &wait_set_, memory_strategy_->number_of_ready_subscriptions(), memory_strategy_->number_of_guard_conditions(), memory_strategy_->number_of_ready_timers(), memory_strategy_->number_of_ready_clients(), memory_strategy_->number_of_ready_services(), memory_strategy_->number_of_ready_events()); if (RCL_RET_OK != ret) { throw_from_rcl_error(ret, "Couldn't resize the wait set"); }
// 把需要等待的订阅、定时器、客户端、服务端对象加入到待等待集合,并设置好rmw底层数据结构 // 函数会填充类似wait_set->impl->rmw_type.type[i]数据结构(一堆宏函数) if (!memory_strategy_->add_handles_to_wait_set(&wait_set_)) { throw std::runtime_error("Couldn't fill wait set"); } }
// 等待,内部调用了rmw_wait等待 // 函数会根据消息有无、定时器是否到期等,填充类似wait_set->impl->rmw_type.types[i]设置 rcl_ret_t status = rcl_wait(&wait_set_, std::chrono::duration_cast<std::chrono::nanoseconds>(timeout).count()); if (status == RCL_RET_WAIT_SET_EMPTY) { RCUTILS_LOG_WARN_NAMED( "rclcpp", "empty wait set received in rcl_wait(). This should never happen."); } elseif (status != RCL_RET_OK && status != RCL_RET_TIMEOUT) { using rclcpp::exceptions::throw_from_rcl_error; throw_from_rcl_error(status, "rcl_wait() failed"); }
// check the null handles in the wait set and remove them from the handles in memory strategy // for callback-based entities std::lock_guard<std::mutex> guard(mutex_); // wait_set_没有准备好的订阅、定时器等对象会是NULL,会在这里被从Executor的Handles数组清除,没有被清除的元素都是有效的请求 // 在后面做2)take的时候,直接对比的是Executor的Handles数据结构和节点中的订阅、定时器等,确定对象是否有效 memory_strategy_->remove_null_handles(&wait_set_); }
size_t i; for (i = 0; i < wait_set->impl->timer_index; ++i) { if (!wait_set->timers[i]) { continue; } bool is_ready = false; rcl_ret_t ret = rcl_timer_is_ready(wait_set->timers[i], &is_ready); if (ret != RCL_RET_OK) { return ret; // The rcl error state should already be set. } if (!is_ready) { wait_set->timers[i] = NULL; } }
//.....
// Set corresponding rcl subscription handles NULL. for (i = 0; i < wait_set->size_of_subscriptions; ++i) { bool is_ready = wait_set->impl->rmw_subscriptions.subscribers[i] != NULL; if (!is_ready) { wait_set->subscriptions[i] = NULL; } }
boolExecutor::get_next_ready_executable_from_map( AnyExecutable & any_executable, const rclcpp::memory_strategy::MemoryStrategy::WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) { TRACEPOINT(rclcpp_executor_get_next_ready); bool success = false; std::lock_guard<std::mutex> guard{mutex_}; // Check the timers to see if there are any that are ready memory_strategy_->get_next_timer(any_executable, weak_groups_to_nodes); if (any_executable.timer) { success = true; } if (!success) { // Check the subscriptions to see if there are any that are ready memory_strategy_->get_next_subscription(any_executable, weak_groups_to_nodes); if (any_executable.subscription) { success = true; } } if (!success) { // Check the services to see if there are any that are ready memory_strategy_->get_next_service(any_executable, weak_groups_to_nodes); if (any_executable.service) { success = true; } } if (!success) { // Check the clients to see if there are any that are ready memory_strategy_->get_next_client(any_executable, weak_groups_to_nodes); if (any_executable.client) { success = true; } } if (!success) { // Check the waitables to see if there are any that are ready memory_strategy_->get_next_waitable(any_executable, weak_groups_to_nodes); if (any_executable.waitable) { any_executable.data = any_executable.waitable->take_data(); success = true; } } // At this point any_executable should be valid with either a valid subscription // or a valid timer, or it should be a null shared_ptr if (success) { rclcpp::CallbackGroup::WeakPtr weak_group_ptr = any_executable.callback_group; auto iter = weak_groups_to_nodes.find(weak_group_ptr); if (iter == weak_groups_to_nodes.end()) { success = false; } }
if (success) { // If it is valid, check to see if the group is mutually exclusive or // not, then mark it accordingly ..Check if the callback_group belongs to this executor if (any_executable.callback_group && any_executable.callback_group->type() == CallbackGroupType::MutuallyExclusive) { // It should not have been taken otherwise assert(any_executable.callback_group->can_be_taken_from().load()); // Set to false to indicate something is being run from this group // This is reset to true either when the any_executable is executed or when the // any_executable is destructued any_executable.callback_group->can_be_taken_from().store(false); } } // If there is no ready executable, return false return success; }
// src/ros2/rclcpp/rclcpp/src/rclcpp/executor.cpp voidget_next_subscription( rclcpp::AnyExecutable & any_exec, const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes)override { auto it = subscription_handles_.begin(); while (it != subscription_handles_.end()) { auto subscription = get_subscription_by_handle(*it, weak_groups_to_nodes); if (subscription) { // Find the group for this handle and see if it can be serviced auto group = get_group_by_subscription(subscription, weak_groups_to_nodes); if (!group) { // Group was not found, meaning the subscription is not valid... // Remove it from the ready list and continue looking it = subscription_handles_.erase(it); continue; } // !!这里很重要,回调组的属性由这里体现 if (!group->can_be_taken_from().load()) { // Group is mutually exclusive and is being used, so skip it for now // Leave it to be checked next time, but continue searching ++it; continue; } // Otherwise it is safe to set and return the any_exec any_exec.subscription = subscription; any_exec.callback_group = group; any_exec.node_base = get_node_by_group(group, weak_groups_to_nodes); subscription_handles_.erase(it); return; } // Else, the subscription is no longer valid, remove it and continue it = subscription_handles_.erase(it); } }
// src/ros2/rclcpp/rclcpp/src/rclcpp/executor.cpp voidExecutor::execute_any_executable(AnyExecutable & any_exec) { if (!spinning.load()) { return; } if (any_exec.timer) { TRACEPOINT( rclcpp_executor_execute, static_cast<constvoid *>(any_exec.timer->get_timer_handle().get())); execute_timer(any_exec.timer); } if (any_exec.subscription) { TRACEPOINT( rclcpp_executor_execute, static_cast<constvoid *>(any_exec.subscription->get_subscription_handle().get())); execute_subscription(any_exec.subscription); } if (any_exec.service) { execute_service(any_exec.service); } if (any_exec.client) { execute_client(any_exec.client); } if (any_exec.waitable) { any_exec.waitable->execute(any_exec.data); }
// !!! 这里很重要 any_exec.callback_group->can_be_taken_from().store(true); // Wake the wait, because it may need to be recalculated or work that // was previously blocked is now available. try { interrupt_guard_condition_.trigger(); } catch (const rclcpp::exceptions::RCLError & ex) { throw std::runtime_error( std::string( "Failed to trigger guard condition from execute_any_executable: ") + ex.what()); } }
voidget_next_subscription( rclcpp::AnyExecutable & any_exec, const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes)override { auto it = subscription_handles_.begin(); while (it != subscription_handles_.end()) { auto subscription = get_subscription_by_handle(*it, weak_groups_to_nodes); if (subscription) { auto group = get_group_by_subscription(subscription, weak_groups_to_nodes); if (!group->can_be_taken_from().load()) { ++it; continue; }